package storage

import (
	"errors"
	bolt "gitee.com/wanttobeamaster/bbolt"
	"gitee.com/wanttobeamaster/gridbase/pkg/pb/raftcmdpb"
	"gitee.com/wanttobeamaster/gridbase/pkg/util"
	"strconv"
)

type HashObj struct {
	Size uint64
}

func MarshalHashObj(obj *HashObj) []byte {
	totalLen := 8
	raw := make([]byte, totalLen)

	idx := 0
	util.Uint64ToBytes1(raw[idx:], obj.Size)

	return raw
}

func UnmarshalHashObj(raw []byte) (*HashObj, error) {
	if len(raw) != 8 {
		return nil, nil
	}
	obj := HashObj{}
	idx := 0
	obj.Size, _ = util.BytesToUint64(raw[idx:])
	return &obj, nil
}

func (e *boltHashEngine) HashMetaObj(key []byte) (*HashObj, error) {
	var (
		v   []byte
		err error
	)
	metaKey := RawKeyPrefix(key)
	err = e.db.View(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltHashBucket"))
		v = b.Get(metaKey)
		return nil
	})

	if err != nil {
		return nil, err
	}
	if v == nil {
		return nil, nil
	}
	obj, err := UnmarshalHashObj(v)
	if err != nil {
		return nil, nil
	}
	return obj, nil
}

func (e *boltHashEngine) newHashMetaObj() *HashObj {
	return &HashObj{
		Size: 0,
	}
}

func (e *boltHashEngine) RawHashDataKey(key, field []byte) []byte {
	keyPrefix := RawKeyPrefix(key)
	dataKey := append(keyPrefix, util.DataTypeKey)
	dataKey = append(dataKey, field...)
	return dataKey
}

type boltHashEngine struct {
	db      *bolt.DB
	limiter util.Limiter
}

func newBoltHashEngine(db *bolt.DB) HashEngine {
	db.Update(func(tx *bolt.Tx) error {
		_, err := tx.CreateBucketIfNotExists([]byte("boltHashBucket"))
		if err != nil {
			return err
		}
		return nil
	})
	return &boltHashEngine{
		db:      db,
		limiter: *util.NewLimiter(LimitConcurrencyWrite),
	}
}

//TODO : implements HashEngine functions
func (e *boltHashEngine) HSet(key, field, value []byte) (int64, error) {
	if len(key) == 0 || len(field) == 0 || len(value) == 0 {
		return 0, ErrKeyOrFieldEmpty
	}

	metaObj, err := e.HashMetaObj(key)
	if err != nil {
		return 0, err
	}
	if metaObj == nil {
		metaObj = e.newHashMetaObj()
	}

	var ret int64

	eDataKey := e.RawHashDataKey(key, field)

	err = e.db.Update(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltHashBucket"))

		v := b.Get(eDataKey)
		if v != nil {
			ret = 0
		} else {
			//new insert field, add hsize
			ret = 1
			metaObj.Size++

			//update meta key
			eMetaValue := MarshalHashObj(metaObj)
			eMetaKey := RawKeyPrefix(key)
			err = b.Put(eMetaKey, eMetaValue)
			if err != nil {
				return err
			}
		}

		//set or Update field
		err = b.Put(eDataKey, value)
		if err != nil {
			return err
		}
		return nil
	})

	if err != nil {
		return 0, err
	}
	return ret, nil
}

func (e *boltHashEngine) HGet(key, field []byte) ([]byte, error) {
	if len(key) == 0 || len(field) == 0 {
		return nil, ErrKeyOrFieldEmpty
	}

	var (
		v   []byte
		err error
	)

	metaObj, err := e.HashMetaObj(key)
	if err != nil {
		return nil, err
	}
	if metaObj == nil {
		return nil, nil
	}

	eDataKey := e.RawHashDataKey(key, field)
	err = e.db.View(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltHashBucket"))
		v = b.Get(eDataKey)
		return nil
	})
	if err != nil {
		return nil, err
	}
	return v, err
}

func (e *boltHashEngine) HDel(key []byte, fields ...[]byte) (int64, error) {
	if len(key) == 0 || len(fields) == 0 {
		return 0, ErrKeyOrFieldEmpty
	}

	metaObj, err := e.HashMetaObj(key)
	if err != nil {
		return 0, err
	}
	if metaObj == nil {
		return 0, nil
	}
	var delCnt uint64

	batchErr := e.db.Batch(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltHashBucket"))

		if metaObj.Size == 0 {
			return nil
		}

		for _, field := range fields {
			eDataKey := e.RawHashDataKey(key, field)
			v := b.Get(eDataKey)

			if v != nil {
				delCnt++
				err = b.Delete(eDataKey)
				if err != nil {
					return err
				}
			}
		}

		metaObj.Size -= delCnt
		eMetaKey := RawKeyPrefix(key)
		if metaObj.Size > 0 {
			//update meta size
			eMetaValue := MarshalHashObj(metaObj)
			err = b.Put(eMetaKey, eMetaValue)
			if err != nil {
				return err
			}
		} else {
			err = b.Delete(eMetaKey)
			if err != nil {
				return err
			}
		}
		return nil
	})

	if batchErr != nil {
		return 0, batchErr
	}

	return int64(delCnt), nil
}

func (e *boltHashEngine) HExists(key, field []byte) (bool, error) {
	metaObj, err := e.HashMetaObj(key)
	if err != nil {
		return false, err
	}
	if metaObj == nil {
		return false, nil
	}
	return true, nil
}

func (e *boltHashEngine) HKeys(key []byte) ([][]byte, error) {
	if len(key) == 0 {
		return nil, ErrEmptyKey
	}

	var (
		err  error
		keys [][]byte
	)

	metaObj, err := e.HashMetaObj(key)
	if err != nil {
		return nil, err
	}
	if metaObj == nil {
		return nil, nil
	}

	eDataKey := e.RawHashDataKey(key, nil)

	err = e.db.View(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltHashBucket"))
		c := b.Cursor()
		count := metaObj.Size
		for k, _ := c.Seek(eDataKey); count > 0 && k != nil; count-- {
			keys = append(keys, k)
			k, _ = c.Next()
		}
		return nil
	})

	if err != nil {
		return nil, err
	}

	retKeys := make([][]byte, len(keys))
	keyPrefixLen := len(RawKeyPrefix(key))
	for i, key := range keys {
		retKeys[i] = key[keyPrefixLen+1:]
	}

	return retKeys, nil
}

func (e *boltHashEngine) HVals(key []byte) ([][]byte, error) {
	if len(key) == 0 {
		return nil, ErrEmptyKey
	}

	var (
		err  error
		vals [][]byte
	)

	metaObj, err := e.HashMetaObj(key)
	if err != nil {
		return nil, err
	}
	if metaObj == nil {
		return nil, nil
	}

	eDataKey := e.RawHashDataKey(key, nil)

	err = e.db.View(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltHashBucket"))
		c := b.Cursor()
		count := metaObj.Size
		for k, v := c.Seek(eDataKey); count > 0 && k != nil; count-- {
			vals = append(vals, v)
			k, v = c.Next()
		}
		return nil
	})

	if err != nil {
		return nil, err
	}
	return vals, err
}

func (e *boltHashEngine) HGetAll(key []byte) ([]*raftcmdpb.FVPair, error) {
	if len(key) == 0 {
		return nil, ErrEmptyKey
	}

	var (
		kvs []*raftcmdpb.FVPair
		err error
	)

	metaObj, err := e.HashMetaObj(key)
	if err != nil {
		return nil, err
	}
	if metaObj == nil {
		return nil, nil
	}

	eDataKey := e.RawHashDataKey(key, nil)
	keyPrefixLen := len(RawKeyPrefix(key))

	err = e.db.View(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltHashBucket"))
		c := b.Cursor()
		count := metaObj.Size
		for k, v := c.Seek(eDataKey); count > 0 && k != nil; count-- {
			kvs = append(kvs, &raftcmdpb.FVPair{
				Field: k[keyPrefixLen+1:],
				Value: v,
			})
			k, v = c.Next()
		}
		return nil
	})

	if err != nil {
		return nil, err
	}

	return kvs, nil
}

func (e *boltHashEngine) HScanGet(key, start []byte, count int) ([]*raftcmdpb.FVPair, error) {
	if len(key) == 0 {
		return nil, ErrEmptyKey
	}

	var (
		kvs []*raftcmdpb.FVPair
		err error
	)

	metaObj, err := e.HashMetaObj(key)
	if err != nil {
		return nil, err
	}
	if metaObj == nil {
		return nil, nil
	}

	eStartDataKey := e.RawHashDataKey(key, start)
	keyPrefixLen := len(RawKeyPrefix(key))

	err = e.db.View(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltHashBucket"))
		c := b.Cursor()
		for k, v := c.Seek(eStartDataKey); count > 0 && k != nil; count-- {
			kvs = append(kvs, &raftcmdpb.FVPair{
				Field: k[keyPrefixLen+1:],
				Value: v,
			})
			k, v = c.Next()
		}
		return nil
	})

	return kvs, nil
}

func (e *boltHashEngine) HLen(key []byte) (int64, error) {
	if len(key) == 0 {
		return 0, ErrEmptyKey
	}

	metaObj, err := e.HashMetaObj(key)
	if err != nil {
		return 0, err
	}
	if metaObj == nil {
		return 0, nil
	}

	return int64(metaObj.Size), nil
}

func (e *boltHashEngine) HMGet(key []byte, fields ...[]byte) ([][]byte, []error) {
	if len(key) == 0 || len(fields) == 0 {
		return nil, nil
	}
	ret := make([][]byte, len(fields))

	metaObj, err := e.HashMetaObj(key)
	if err != nil {
		return nil, nil
	}
	if metaObj == nil {
		return ret, nil
	}

	err = e.db.View(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltHashBucket"))
		for i := range fields {
			eDataKey := e.RawHashDataKey(key, fields[i])
			val := b.Get(eDataKey)
			ret[i] = make([]byte, len(val))
			copy(ret[i], val)
		}
		return nil
	})
	return ret, nil
}

func (e *boltHashEngine) HMSet(key []byte, fields, values [][]byte) error {
	if len(key) == 0 || len(fields) == 0 || len(values) == 0 || len(values) != len(fields) {
		return ErrKeyOrFieldEmpty
	}

	metaObj, err := e.HashMetaObj(key)
	if err != nil {
		return err
	}
	if metaObj == nil {
		metaObj = e.newHashMetaObj()
	}

	batchErr := e.db.Batch(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltHashBucket"))

		for i := range fields {
			eDataKey := e.RawHashDataKey(key, fields[i])

			v := b.Get(eDataKey)
			//insert new field
			if v == nil {
				metaObj.Size++
			}
			//set
			err := b.Put(eDataKey, values[i])
			if err != nil {
				return err
			}
			return nil
		}
		//update meta size
		eMetaValue := MarshalHashObj(metaObj)
		eMetaKey := RawKeyPrefix(key)
		err := b.Put(eMetaKey, eMetaValue)
		if err != nil {
			return err
		}
		return nil
	})
	if batchErr != nil {
		return err
	}
	return nil
}

func (e *boltHashEngine) HSetNX(key, field, value []byte) (int64, error) {
	if len(key) == 0 || len(field) == 0 || len(value) == 0 {
		return 0, ErrKeyOrFieldEmpty
	}

	metaObj, err := e.HashMetaObj(key)
	if err != nil {
		return 0, err
	}
	if metaObj == nil {
		metaObj = e.newHashMetaObj()
	}

	var ret int64

	eDataKey := e.RawHashDataKey(key, field)

	err = e.db.Update(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltHashBucket"))

		v := b.Get(eDataKey)
		if v != nil {
			ret = 0
		} else {
			//new insert field, add hsize
			ret = 1
			metaObj.Size++

			//update meta key
			eMetaValue := MarshalHashObj(metaObj)
			eMetaKey := RawKeyPrefix(key)
			err = b.Put(eMetaKey, eMetaValue)
			if err != nil {
				return err
			}
		}

		//set or Update field
		if ret == 1 {
			err = b.Put(eDataKey, value)
			if err != nil {
				return err
			}
		}
		return nil
	})

	if err != nil {
		return 0, err
	}
	return ret, nil
}

func (e *boltHashEngine) HStrLen(key, field []byte) (int64, error) {
	v, err := e.HGet(key, field)
	if err != nil {
		return 0, err
	}
	return int64(len(v)), nil
}

func (e *boltHashEngine) HIncrBy(key, field []byte, incrment int64) ([]byte, error) {
	if len(key) == 0 || len(field) == 0 {
		return nil, ErrKeyOrFieldEmpty
	}

	metaObj, err := e.HashMetaObj(key)
	if err != nil {
		return nil, err
	}
	if metaObj == nil {
		metaObj = e.newHashMetaObj()
	}

	eDataKey := e.RawHashDataKey(key, field)
	var ret []byte

	updateErr := e.db.Update(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte("boltHashBucket"))

		v := b.Get(eDataKey)
		if !util.IsNumber(string(v)) {
			return errors.New("current value is not a number!")
		}
		//make increment
		num, err := strconv.Atoi(string(v))
		if err != nil {
			return err
		}
		num += int(incrment)
		v = []byte(strconv.Itoa(num))

		//copy into result []byte
		ret = make([]byte, len(v))
		copy(ret, v)

		//put new value back
		err = b.Put(eDataKey, v)
		if err != nil {
			return err
		}
		return nil
	})
	if updateErr != nil {
		return nil, nil
	}
	return ret, nil
}
